;; This file is part of GNUnet.
;; Copyright (C) 2012-2019, 2021 GNUnet e.V.
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL-3.0-or-later

;; Author: Florian Dold
;; Author: Maxime Devos
;; C file: util/mq.c
;; Scheme module: (gnu gnunet mq)
;;
;; A message queue for GNUnet messages.
;; Messages are made of bytes. In particular,
;; messages must be prefixed by a /:message-header.
(define-library (gnu gnunet mq)
  (export <message-queue> make-message-queue message-queue?
	  make-one-by-one-sender
	  inject-message! inject-error! send-message!
	  message-queue-length
	  %message-queue-garbagitude
	  try-send-again!
	  close-queue!

	  &missing-header-error make-missing-header-error
	  missing-header-error? missing-header-error-received-size
	  &size-mismatch-error make-size-mismatch-error
	  size-mismatch-error? size-mismatch-error-expected-size
	  size-mismatch-error-received-size

	  &overly-full-queue-warning
	  make-overly-full-queue-warning overly-full-queue-warning?
	  overly-full-queue-current-length overly-full-queue-suspicious-when

	  ;; Can be adjusted for debugging -- no guarantees it
	  ;; will not be removed!
	  %suspicious-length)
  (import (gnu gnunet mq handler)
	  (gnu gnunet mq envelope)
	  (gnu gnunet utils hat-let)
	  (only (gnu gnunet utils bv-slice)
		slice-slice slice-length slice?)
	  (only (gnu gnunet util struct)
		/:message-header)
	  (only (gnu gnunet netstruct syntactic)
		sizeof read%)
	  (only (guile) lambda* define* exact-integer?)
	  (only (ice-9 weak-vector)
		weak-vector weak-vector-ref)
	  (only (ice-9 atomic)
		make-atomic-box atomic-box-ref
		atomic-box-compare-and-swap!)
	  (only (rnrs base)
		lambda assert let begin define
		procedure? eq? >= = <= < if quote
		values and let* not cons car cdr
		cond + - > * apply)
	  (only (rnrs control)
		when unless)
	  (only (rnrs conditions)
		define-condition-type &warning &error
		make-who-condition condition)
	  (only (rnrs exceptions)
		raise raise-continuable)
	  (only (rnrs records syntactic) define-record-type)
	  (only (srfi srfi-1) filter)
	  (only (srfi srfi-8) receive)
	  (only (srfi srfi-39) make-parameter)
	  (prefix (only (pfds queues)
			make-queue dequeue enqueue queue-length
			queue-empty? queue->list list->queue)
		  #{pfds:}#))
  (begin
    (define-record-type (<message-queue> make-message-queue message-queue?)
      (fields (immutable handlers message-queue-handlers)
	      (immutable error-handler message-queue-error-handler)
	      ;; Atomic box of a queue of messages to send (as @code{<envelope>}
	      ;; objects), together with an over-estimate of how many items in
	      ;; the queue are already cancelled, used as a heuristic for when
	      ;; optimising the message queue is required.
	      ;;
	      ;; It can occassionally be an under-estimate due to marking
	      ;; envelopes as cancelled and updating the estimate not being
	      ;; an atomic operation.
	      (immutable messages+garbage/box message-queue-messages+garbage/box)
	      ;; A procedure for actually sending the messages.
	      ;; It accepts a single argument, the message queue.
	      ;;
	      ;; It is run each time a message a message has been
	      ;; enqueued. It is not obligated to send the messages
	      ;; right now, though it probably should send them
	      ;; soonish. It can be run at any time, with
	      ;; @code{try-send-again!}.
	      (immutable sender message-queue-sender)
	      ;; A thunk to ‘close’ the message queue, usually telling the
	      ;; remote peer that no additional data will be transmitted
	      ;; in either direction and stopping associated threads.
	      (immutable closer message-queue-closer))
      (protocol
       (lambda (%make)
	 (lambda* (handlers error-handler sender #:optional (closer values))
	   "Make a message queue with message handlers @var{handlers}.

The message handlers are expected to handle bytevector slices
that start with a @code{/:message-header}. The index of the message
handler is the ‘message type’.  Note that, unlike in the C implementation,
messages are not serialised.  As such, some synchronisation or punting
messages onto a separate thread may be necessary.

Injected errors are passed to @var{error-handler}, a variadic procedure.
A list of possible errors can be found in the manual.

Messages are sent with @var{sender}. It can be created with
@code{make-one-by-one-sender}.  Optionally, a @var{closer} procedure can
be passed.  Such a procedure is expected to be idempotent, see
@code{close-queue!} for details."
	   ;; Predicate does not exist yet ...
	   #;(assert (message-handlers? handlers))
	   #;(assert (message-handler? error-handler))
	   (assert (procedure? closer))
	   (%make handlers error-handler
		  (make-atomic-box (cons (pfds:make-queue) 0))
		  sender
		  closer)))))

    (define (make-one-by-one-sender proc)
      "Make a message sender, sending messages one-by-one with @var{proc}.

The procedure @var{proc} must accept a single argument,
the message envelope to send. This procedure should
use @code{attempt-irrevocable-sent!} when it feels ready.
It must not return any values currently.

The message does not need to be send directly.
However, remember that unless the priority allows otherwise,
messages must be sent in-order (TODO really received in-order?)."
      (assert (procedure? proc))
      (lambda (mq)
	(assert (message-queue? mq))
	(%%bind-atomic-boxen
	 ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
	 ;; First extract an envelope ...
	 (let spin ((old queue+garbage))
	   (define old-queue (car old))
	   (define old-garbage (cdr old))
	   (assert (<= 0 old-garbage))
	   ;; ... unless there isn't anything to remove anymore.
	   ;; This check cannot be moved outside the (let spin ...),
	   ;; as message senders may be called at any time
	   ;; (even if there are no messages!). Also, in case of
	   ;; concurrency, the queue may become empty after a spin
	   ;; iteration.
	   (unless (pfds:queue-empty? old-queue)
	     (receive (envelope new-queue) (pfds:dequeue old-queue)
	       (cond ((envelope-peek-cancelled? envelope)
		      ;; There is no need to pass already cancelled
		      ;; envelopes to @var{proc} (although passing them
		      ;; anyway should be harmless), so remove them
		      ;; from the queue. Also try to keep the estimate
		      ;; accurate.
		      (swap! old (cons new-queue (- old-garbage 1))))
		     ;; We extracted a (not-yet-cancelled) envelope.
		     ;; Now do something with it!
		     ((eq? old (swap! old (cons new-queue old-garbage)))
		      ;; Make sure @var{proc} does not return
		      ;; any values, as we may want to assign
		      ;; meaning to return values later.
		      (receive ()
			  ;; Process the message.
			  (proc envelope)
			(values))))
	       ;; Process remaining messages (or retry in case there was
	       ;; a race and we lost it).
	       ;; TODO: if someone else modified the message queue,
	       ;; does that mean we don't have to anymore?
	       (spin queue+garbage)))))))

    (define (inject-message! mq message)
      "Call the message handler that was registered
for the type of the message @var{mq} in the message queue var{mq}
with the message @var{message}. In case the message is malformed
(according to the message handler), inject a @code{logic:ill-formed}
error instead.  In case no appropriate message handler exists,
inject a @code{logic:no-handler} error instead.

It is an error for @var{message} to be so small it doesn't have
a @code{/:message-header}. Likewise, it is also an error for the
size in the message header not to correspond to the size of the
slice @var{message}.  In the first case, a @code{&missing-header-error}
is raised.  In the second case, a @code{&size-mismatch-error} is raised.

In both cases, a @code{&who} condition with as value @code{inject-message!}
(a symbol) is included as well.

This procedure is intended to be used by the implementation
of message queues."
      (let^ ((! message-header-size (sizeof /:message-header '()))
	     (! message-size (slice-length message))
	     (? (< message-size message-header-size)
		(raise (condition
			(make-missing-header-error message-size)
			(make-who-condition 'inject-message!))))
	     (! header
		(slice-slice message 0 (sizeof /:message-header '())))
	     (! type (read% /:message-header '(type) header))
	     (! supposed-size (read% /:message-header '(size) header))
	     (? (not (= message-size supposed-size))
		(raise (condition
			(make-size-mismatch-error supposed-size message-size)
			(make-who-condition 'inject-message!))))
	     ;; #f if does not exist
	     (! handler (message-handler-for
			 (message-queue-handlers mq)
			 type))
	     (? (not handler)
		(inject-error! mq 'logic:no-handler type))
	     (? (not (verify-message? handler message))
		(inject-error! mq 'logic:ill-formed type)))
	    (handle-message! handler message)))

    (define (inject-error! mq key . rest)
      "Inject the error @code{key . rest} in the message queue @var{mq}.

This is meant to be used by the message queue implementation,
e.g. in response to an I/O error, although in principle it can be used by
the user of the message queue as well.  Whether the message queue is still
usable when this procedure is called, depends on the message queue
implementation and injected error."
      (apply (message-queue-error-handler mq) key rest))

    (define (message-queue-length mq)
      "How many messages are currently in the message queue @var{mq}?"
      (pfds:queue-length
       (car (atomic-box-ref (message-queue-messages+garbage/box mq)))))

    (define (%message-queue-garbagitude mq)
      "Return the estimated amount of cancelled envelopes. This procedure
is not part of the API and is only intended for the test suite."
      (cdr (atomic-box-ref (message-queue-messages+garbage/box mq))))

    (define-condition-type &missing-header-error &error
      make-missing-header-error missing-header-error?
      (received-size missing-header-error-received-size))

    (define-condition-type &size-mismatch-error &error
      make-size-mismatch-error size-mismatch-error?
      (expected-size size-mismatch-error-expected-size)
      (received-size size-mismatch-error-received-size))

    (define-condition-type &overly-full-queue-warning &warning
      make-overly-full-queue-warning overly-full-queue-warning?
      (current-length  overly-full-queue-current-length)
      (suspicious-when overly-full-queue-suspicious-when))

    (define %suspicious-length
      (make-parameter 10000))

    (define* (send-message! mq message #:key (priority 0)
			    (notify-sent! values))
      "Send a message with the given message queue. A continuable
@code{&warning} may be raised, e.g. a @code{&overly-full-queue-warning}
in case the queue is suspiciously long. The message queue implementation
can raise errors of its own as well, as usual.

@var{priority} is a numeric priority-preference value for @var{message},
from @code{(gnu gnunet mq prio-prefs)}. By default, @var{message} will be
sent as reliable background traffic (@code{prio:background}).

The in-order sending of ordered messages (when requested by @var{priority})
is only guaranteed when supported by the message queue implementation
and when @code{try-send-again!} and @code{send-message!} are not being
used concurrently on the same message queue.

When the message has been irrevocabily sent, the thunk @var{notify-sent!}
will be called.

After normal execution, the message envelope is returned,
but in case of an exception (for example, an out-of-memory exception during
the handling of a @code{&overly-full-queue-warning}), it is possible
the envelope isn't returned even though it has been enqueued and it might
perhaps be sent."
      (define mq/weak
	(let ((v (weak-vector mq)))
	  (lambda () (weak-vector-ref v 0))))
      (define (cancel!)
	(let ((mq (mq/weak)))
	  (if mq
	      (increment-garbage&maybe-cleanup mq)
	      (values))))
      (assert (and (slice? message)
		   (exact-integer? priority)
		   (<= 0 priority) (< priority 512)))
      (%%bind-atomic-boxen
       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
       ;; Add the message to the queue. Also remember the
       ;; length of the new queue; we'll need it later.
       (let* ((envelope (make-envelope cancel!
				       message
				       #:priority priority
				       #:notify-sent! notify-sent!))
	      (queue-length
	       (let spin ((old queue+garbage))
		 (let* ((old-queue (car old))
			(old-garbage (cdr old))
			(new-queue (pfds:enqueue old-queue envelope)))
		   (if (eq? old (swap! old (cons new-queue old-garbage)))
		       (pfds:queue-length new-queue)
		       (spin queue+garbage))))))
	 ;; The C implementation emits a warning if the queue has
	 ;; many entries, as this may indicate a bug (in the scheduler,
	 ;; in the queue implementation, ...). This seems a good idea.
	 (let ((suspicious-length (%suspicious-length)))
	   (when (>= queue-length suspicious-length)
	     (raise-continuable
	      (condition (make-overly-full-queue-warning
			  queue-length suspicious-length)
			 ;; TODO: consider
			 ;; (@ (gnu gnunet mq) send!) here and elsewhere.
			 (make-who-condition 'send-message!)))))
	 (try-send-again! mq)
	 envelope)))

    (define (try-send-again! mq)
      "Try to send messages in the queue @var{mq} that were not yet sent.
This is expected to be called from the message queue implementation."
      ((message-queue-sender mq) mq))

    (define (close-queue! mq)
      "Close the message queue @var{mq}.  The exact semantics are
implementation-dependent.  Conventionally, this frees resources such as
sockets and threads and is idempotent and non-blocking.  There is no
guarantee that all messages in the queue will be sent before closing."
      ((message-queue-closer mq)))

    (define (queue-filter ? queue)
      "Construct a queue, based on @var{queue}, restricted to elements
satisfying the predicate @var{?}."
      (pfds:list->queue (filter ? (pfds:queue->list queue))))

    (define (increment-garbage&maybe-cleanup mq)
      "Increment the garbage counter of @var{mq} and perhaps
take out the trash (i.e., cancelled envelopes still in the queue),
and if the trash is taken out, reset the garbage counter to zero,
as an atomic operation."
      (%%bind-atomic-boxen
       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
       (let loop ((old queue+garbage))
	 (let* ((old-queue (car old))
		(old-queue-length (pfds:queue-length old-queue))
		(old-garbage (cdr old))
		(incremented-garbage (+ 1 old-garbage)))
	   (assert (<= 0 old-garbage))
	   ;; If the messages in the queue are largely
	   ;; garbage, throw the garbage out.  The procedure
	   ;; choses to throw the garbage out if the (estimated)
	   ;; ratio of garbage to the queue length is more than
	   ;; 3/4.
	   ;;
	   ;; There are no deep theoretical reasons for choosing
	   ;; the ratio 3/4=0.75, only that it is between 1/2 and
	   ;; 1. Choosing a ratio seemed less arbitrary than, say,
	   ;; only collect garbage if the garbage exceeds some
	   ;; fixed amount.
	   (if (> (* 4 incremented-garbage) (* 3 old-queue-length))
	       ;; It is time to collect garbage!
	       ;; Construct a new queue with all garbage removed.
	       (let ((filtered (queue-filter
				(lambda (i)
				  (not (envelope-peek-cancelled? i)))
				old-queue)))
		 ;; Try to write this new queue,
		 ;; resetting the garbage counter.
		 (if (eq? old (swap! old (cons filtered 0)))
		     ;; All garbage has been thrown out! Done!
		     (values)
		     ;; We lost the race, try again!
		     (loop queue+garbage)))
	       ;; Not yet time for garbage collection,
	       ;; just increment the garbage counter
	       (if (eq? old (swap! old (cons old-queue incremented-garbage)))
		   ;; The garbage counter has been incremented! Done!
		   (values)
		   ;; We lost the race, try again!
		   (loop queue+garbage)))))))))
